package com.tpvlog.dfs.namenode.register;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 负责管理集群里的所有DataNode
 *
 * @author Ressmix
 */
public class DataNodeManager {
    // 副本数量
    public static final Integer REPLICA_NUM = 2;

    // 集群中所有的DataNode信息，Key为IP-HOSTNAME
    private Map<String, DataNodeInfo> datanodes = new ConcurrentHashMap<>();

    // 文件与DataNode的映射，Key为文件名
    private Map<String, List<DataNodeInfo>> datanodeMappedByFile = new ConcurrentHashMap<>();

    // DataNode与文件列表的映射，Key为DataNode
    private Map<String, List<String>> fileMappedByDataNode = new ConcurrentHashMap<>();

    private ReentrantReadWriteLock rrw = new ReentrantReadWriteLock();

    public DataNodeManager() {
        // 启动心跳管理线程
        new DataNodeAliveMonitor().start();
    }

    /**
     * datanode注册
     */
    public Boolean register(String ip, String hostname, int nioPort) {
        if (containDataNode(ip, hostname)) {
            System.out.println("当前已经存在这个DataNode了......");
            return false;
        }
        setDataNodeInfo(ip, hostname, new DataNodeInfo(ip, hostname, nioPort));
        System.out.println("DataNode注册：ip=" + ip + ",hostname=" + hostname + ", nioPort=" + nioPort);
        return true;
    }

    /**
     * datanode心跳
     */
    public Boolean heartbeat(String ip, String hostname, int nioPort) {
        DataNodeInfo datanode = getDataNodeInfo(ip, hostname);
        if (datanode == null) {
            return false;
        }

        datanode.setLatestHeartbeatTime(System.currentTimeMillis());
        System.out.println("DataNode发送心跳：ip=" + ip + ",hostname=" + hostname);
        return true;
    }

    /**
     * 获取可供上传的DataNode节点及副本
     */
    public List<DataNodeInfo> allocateDataNodes(long fileSize, String excludedDataNodeId) {
        synchronized (this) {
            DataNodeInfo excludedDataNode = datanodes.get(excludedDataNodeId);
            List<DataNodeInfo> datanodeList = new ArrayList<>();
            for (DataNodeInfo datanode : datanodes.values()) {
                if (!datanode.equals(excludedDataNode)) {
                    datanodeList.add(datanode);
                }
            }
            Collections.sort(datanodeList);

            // 选择存储数据最少的头两个datanode出来
            List<DataNodeInfo> selectedDatanodes = new ArrayList<>();
            if (datanodeList.size() >= 2) {
                selectedDatanodes.add(datanodeList.get(0));
                selectedDatanodes.add(datanodeList.get(1));
                // 增加节点存储数据的大小
                datanodeList.get(0).addStoredDataSize(fileSize);
                datanodeList.get(1).addStoredDataSize(fileSize);
            } else if (datanodeList.size() == 1) {
                selectedDatanodes.add(datanodeList.get(0));
            }
            return selectedDatanodes;
        }
    }

    /**
     * 获取可供下载的DataNode节点
     *
     * @param filename
     * @return
     */
    public DataNodeInfo getDataNodeForFile(String filename, String excludedDataNode) {
        try {
            rrw.readLock().lock();
            // 需要排除的节点
            DataNodeInfo excluded = datanodes.get(excludedDataNode);
            List<DataNodeInfo> datanodes = datanodeMappedByFile.get(filename);
            if (datanodes.size() == 1 && (datanodes.get(0).equals(excluded))) {
                return null;
            }

            int size = datanodes.size();
            // 随机选择一个非异常节点
            Random random = new Random(System.currentTimeMillis());
            while (true) {
                int index = random.nextInt(size);
                DataNodeInfo datanode = datanodes.get(index);
                if (!datanode.equals(excluded)) {
                    return datanode;
                }
            }
        } finally {
            rrw.readLock().lock();
        }
    }

    /**
     * 增量上报
     */
    public void deltaReportDataNodeInfo(String ip, String hostname, String filename, Long fileLength) {
        try {
            rrw.writeLock().lock();

            List<DataNodeInfo> replicas = datanodeMappedByFile.get(filename);
            if (replicas == null) {
                replicas = new ArrayList<>();
                datanodeMappedByFile.put(filename, replicas);
            }

            DataNodeInfo datanode = this.getDataNodeInfo(ip, hostname);

            // 检查当前文件的副本数量是否超标
            if (replicas.size() == REPLICA_NUM) {
                // 减少这个节点上的存储数据量
                datanode.addStoredDataSize(-fileLength);
                // 生成文件删除任务
                RemoveReplicaTask removeReplicaTask = new RemoveReplicaTask(filename, datanode);
                datanode.addRemoveReplicaTask(removeReplicaTask);
            } else {
                // 如果副本数量未超标，才会将副本放入数据结构中
                replicas.add(datanode);
                // 维护每个数据节点拥有的文件副本
                List<String> files = fileMappedByDataNode.get(ip + "-" + hostname);
                if (files == null) {
                    files = new ArrayList<String>();
                    fileMappedByDataNode.put(ip + "-" + hostname, files);
                }

                files.add(filename + "_" + fileLength);
            }
            System.out.println("收到DataNode增量上报信息，当前的副本信息为：" + datanodeMappedByFile);
        } finally {
            rrw.writeLock().unlock();
        }
    }

    /**
     * 全量上报
     */
    public void fullyReportDataNodeInfo(String ip, String hostname, List<String> filenameList, Long storedDataSize) {
        DataNodeInfo datanode = this.getDataNodeInfo(ip, hostname);
        datanode.setStoredDataSize(storedDataSize);
        for (String filename : filenameList) {
            this.deltaReportDataNodeInfo(hostname, ip, filename, storedDataSize);
        }
    }

    /**
     * DataNode集群rebalance
     */
    public void rebalance() {
        synchronized (this) {
            // 1.计算DataNode集群容量平均值
            long totalStoredDataSize = 0;
            for (DataNodeInfo datanode : datanodes.values()) {
                totalStoredDataSize += datanode.getStoredDataSize();
            }
            long averageStoredDataSize = totalStoredDataSize / datanodes.size();

            // 2.将集群节点分为两类：source迁出节点（大于平均值）和dest迁入节点（小于平均值）
            List<DataNodeInfo> sourceDatanodes = new ArrayList<>();
            List<DataNodeInfo> destDatanodes = new ArrayList<>();

            for (DataNodeInfo datanode : datanodes.values()) {
                // 已存储容量大于平均值的作为source迁出节点
                if (datanode.getStoredDataSize() > averageStoredDataSize) {
                    sourceDatanodes.add(datanode);
                }
                // 已存储容量小于平均值的作为dest迁入节点
                if (datanode.getStoredDataSize() < averageStoredDataSize) {
                    destDatanodes.add(datanode);
                }
            }

            // 3.为dest节点生成复制任务，为source节点生成删除任务
            List<RemoveReplicaTask> removeReplicaTasks = new ArrayList<>();
            for (DataNodeInfo sourceDatanode : sourceDatanodes) {
                long toRemoveDataSize = sourceDatanode.getStoredDataSize() - averageStoredDataSize;
                for (DataNodeInfo destDatanode : destDatanodes) {
                    // 找到一个能够容纳的dest节点
                    if (destDatanode.getStoredDataSize() + toRemoveDataSize <= averageStoredDataSize) {
                        createRebalanceTasks(sourceDatanode, destDatanode,
                                removeReplicaTasks, toRemoveDataSize);
                        break;
                    } else {
                        long maxRemoveDataSize = averageStoredDataSize - destDatanode.getStoredDataSize();
                        long removedDataSize = createRebalanceTasks(sourceDatanode, destDatanode,
                                removeReplicaTasks, maxRemoveDataSize);
                        toRemoveDataSize -= removedDataSize;
                    }
                }
            }

            // 交给一个延迟线程去24小时之后执行删除副本的任务
            new DelayRemoveReplicaThread(removeReplicaTasks).start();
        }
    }

    /**
     * 延迟删除副本的线程
     */
    class DelayRemoveReplicaThread extends Thread {
        private List<RemoveReplicaTask> removeReplicaTasks;

        public DelayRemoveReplicaThread(List<RemoveReplicaTask> removeReplicaTasks) {
            this.removeReplicaTasks = removeReplicaTasks;
        }

        @Override
        public void run() {
            long start = System.currentTimeMillis();
            while (true) {
                try {
                    long now = System.currentTimeMillis();
                    // 延迟24小时
                    if (now - start > 24 * 60 * 60 * 1000) {
                        start = System.currentTimeMillis();
                        for (RemoveReplicaTask removeReplicaTask : removeReplicaTasks) {
                            removeReplicaTask.getTargetDataNode().addRemoveReplicaTask(removeReplicaTask);
                        }
                        break;
                    }

                    Thread.sleep(60 * 1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public DataNodeInfo getDataNodeInfo(String ip, String hostname) {
        return datanodes.get(ip + "-" + hostname);
    }

    public void setDataNodeInfo(String ip, String hostname, DataNodeInfo dataNodeInfo) {
        datanodes.put(ip + "-" + hostname, dataNodeInfo);
    }

    public Boolean containDataNode(String ip, String hostname) {
        return datanodes.containsKey(ip + "-" + hostname);
    }


    /*---------------------------------------------PRIVATE METHOD----------------------------------------------*/

    /**
     * 创建rebalance任务
     *
     * @param sourceDatanode
     * @param destDatanode
     * @param removeReplicaTasks
     * @param maxRemoveDataSize
     * @return
     */
    private long createRebalanceTasks(DataNodeInfo sourceDatanode, DataNodeInfo destDatanode,
                                      List<RemoveReplicaTask> removeReplicaTasks, long maxRemoveDataSize) {
        // 1.列出源节点的所有文件
        List<String> files = fileMappedByDataNode.get(sourceDatanode);

        // 2.遍历文件，为每个文件生成一个复制任务
        long removedDataSize = 0;
        for (String file : files) {
            String filename = file.split("_")[0];
            long fileLength = Long.valueOf(file.split("_")[1]);
            if (removedDataSize + fileLength >= maxRemoveDataSize) {
                break;
            }

            // 生成文件复制任务
            ReplicateTask replicateTask = new ReplicateTask(
                    filename, fileLength, sourceDatanode, destDatanode);
            destDatanode.addReplicateTask(replicateTask);
            destDatanode.addStoredDataSize(fileLength);

            // 生成文件删除任务
            sourceDatanode.addStoredDataSize(-fileLength);
            this.removeReplicaFromDataNode(sourceDatanode.getId(), file);
            RemoveReplicaTask removeReplicaTask = new RemoveReplicaTask(
                    filename, sourceDatanode);
            removeReplicaTasks.add(removeReplicaTask);

            removedDataSize += fileLength;
        }

        return removedDataSize;
    }

    /**
     * 从数据节点删除掉一个文件副本
     *
     * @param id
     */
    private void removeReplicaFromDataNode(String id, String file) {
        try {
            rrw.writeLock().lock();
            fileMappedByDataNode.get(id).remove(file);
            Iterator<DataNodeInfo> replicasIterator = datanodeMappedByFile.get(file.split("_")[0]).iterator();
            while (replicasIterator.hasNext()) {
                DataNodeInfo replica = replicasIterator.next();
                if (replica.getId().equals(id)) {
                    replicasIterator.remove();
                }
            }
        } finally {
            rrw.writeLock().unlock();
        }
    }

    /**
     * datanode是否存活的监控线程
     */
    private class DataNodeAliveMonitor extends Thread {
        @Override
        public void run() {
            try {
                while (true) {
                    List<DataNodeInfo> toRemoveDatanodes = new ArrayList<>();
                    Iterator<DataNodeInfo> datanodesIterator = datanodes.values().iterator();
                    DataNodeInfo datanode = null;
                    while (datanodesIterator.hasNext()) {
                        datanode = datanodesIterator.next();
                        // 遍历保存的DataNode节点，如果超过90秒未上送心跳，则移除
                        if (System.currentTimeMillis() - datanode.getLatestHeartbeatTime() > 90 * 1000) {
                            toRemoveDatanodes.add(datanode);
                        }
                    }
                    if (!toRemoveDatanodes.isEmpty()) {
                        for (DataNodeInfo toRemoveDatanode : toRemoveDatanodes) {
                            System.out.println("数据节点【" + toRemoveDatanode + "】宕机，需要进行副本复制......");
                            // 1.创建一个副本复制任务
                            createLostReplicaTask(toRemoveDatanode);
                            // 2.从DataNode集群元数据中删除该节点
                            removeDeadDataNode(toRemoveDatanode);
                        }
                    }
                    // 每隔30秒检测一次
                    Thread.sleep(30 * 1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        private void createLostReplicaTask(DataNodeInfo deadDataNode) {
            List<String> files = fileMappedByDataNode.get(deadDataNode.getId());
            // 为每个文件创建一个副本复制任务
            for (String file : files) {
                String filename = file.split("_")[0];
                Long filesize = Long.valueOf(file.split("_")[1]);
                // 获取复制任务的源数据节点
                DataNodeInfo sourceDatanode = getReplicateSource(filename, deadDataNode);
                // 获取复制任务的目标数据节点
                DataNodeInfo destDatanode = getReplicateTarget(filesize, sourceDatanode, deadDataNode);
                // 创建复制任务
                ReplicateTask replicateTask = new ReplicateTask(filename, filesize, sourceDatanode, destDatanode);
                // 将复制任务放到目标数据节点的任务队列里去
                destDatanode.addReplicateTask(replicateTask);
                System.out.println("为目标数据节点生成一个副本复制任务，" + replicateTask);
            }
        }

        private DataNodeInfo getReplicateTarget(Long filesize, DataNodeInfo sourceDatanode, DataNodeInfo deadDataNode) {
            synchronized (this) {
                DataNodeInfo targetDataNode = null;
                for (DataNodeInfo datanode : datanodes.values()) {
                    // 目标节点不能是源节点，也不能是挂掉的节点
                    if (!datanode.equals(sourceDatanode) && !datanode.equals(deadDataNode)) {
                        targetDataNode = datanode;
                        break;
                    }
                }
                if (targetDataNode != null) {
                    targetDataNode.addStoredDataSize(filesize);
                }
                return targetDataNode;
            }
        }

        /**
         * 从DataNode集群元数据中删除该节点
         */
        private void removeDeadDataNode(DataNodeInfo datanode) {
            try {
                rrw.writeLock().lock();
                datanodes.remove(datanode.getId());
                // 遍历该宕机DataNode中的所有文件
                List<String> filenames = fileMappedByDataNode.get(datanode.getId());
                for (String filename : filenames) {
                    List<DataNodeInfo> replicas = datanodeMappedByFile.get(filename.split("_")[0]);
                    replicas.remove(datanode);
                }
                fileMappedByDataNode.remove(datanode.getId());

                System.out.println("从集群元数据中删除宕机节点元数据，" + datanodeMappedByFile + "，" + fileMappedByDataNode);
            } finally {
                rrw.writeLock().unlock();
            }
        }

        /**
         * 获取复制任务的源数据节点，即挂掉的DataNode的存活镜像节点
         *
         * @return
         */
        public DataNodeInfo getReplicateSource(String filename, DataNodeInfo deadDatanode) {
            DataNodeInfo replicateSource = null;
            try {
                rrw.readLock().lock();
                List<DataNodeInfo> replicas = datanodeMappedByFile.get(filename);
                for (DataNodeInfo replica : replicas) {
                    if (!replica.equals(deadDatanode)) {
                        replicateSource = replica;
                        break;
                    }
                }
            } finally {
                rrw.readLock().unlock();
            }
            return replicateSource;
        }
    }
}
